iT邦幫忙

2025 iThome 鐵人賽

DAY 22
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 22

Day 22: Join 算子 Part 2 - Sort-Merge Join 和策略選擇

  • 分享至 

  • xImage
  •  

前言

在昨天的文章中,我們探討了 Hash Join 的原理和實現,學習到 Hash Join 透過兩階段模型(Build Phase 和 Probe Phase)實現了 O(N + M) 的時間複雜度,然而 Hash Join 在以下場景效能會大幅下降:

1. 數據已排序

-- 兩個表都已經按 user_id 排序
SELECT * 
FROM sorted_users u
JOIN sorted_orders o ON u.user_id = o.user_id
ORDER BY u.user_id;

如果數據已經排序,使用 Hash Join 意味著我們忽略了這個寶貴的排序信息。我們需要構建 Hash Table,消耗額外記憶體和 CPU 時間。

2. 記憶體嚴重受限

-- 兩個都是大表,記憶體只有 2GB
SELECT * 
FROM huge_table1 t1  -- 10 億行,20 GB
JOIN huge_table2 t2  -- 8 億行,15 GB
ON t1.join_key = t2.join_key;

Hash Join 需要將較小的表(15 GB)完全載入記憶體構建 Hash Table。但如果可用記憶體只有 2GB,系統需要不斷的 spilling,性能急劇下降。

這就是 Sort-Merge Join 發揮作用的地方。今天我們將學習:

  1. Sort-Merge Join 的核心原理和歸併邏輯
  2. Streamed 側和 Buffered 側的運作方式
  3. Nested Loop Join 的簡單實現
  4. DataFusion 如何在不同 Join 策略之間做出選擇
  5. 各種場景下的最優策略

Sort-Merge Join 的核心概念

什麼是 Sort-Merge Join?

Sort-Merge Join 是基於 Merge Sort 的 Join 算法。使用它的前提是:兩個輸入都已按連接鍵排序

基本思想:

已排序的左表:         已排序的右表:
user_id | name        user_id | city
--------+-------      --------+----------
1       | Alice       1       | Taipei
1       | Alice2      2       | Taichung
2       | Bob         2       | Kaohsiung
3       | Carol       4       | Tainan
5       | Dave        

歸併過程:
1. 同時從兩個表的開頭開始掃描
2. 比較當前行的連接鍵
3. 如果相等,產生結果行
4. 如果不等,推進較小值的那一側
5. 重複直到至少一側耗盡

讓我們通過一個例子來理解:

初始狀態:
Left:  [1, 1, 2, 3, 5]  ← L 指針指向第一個 1
Right: [1, 2, 2, 4]     ← R 指針指向第一個 1

步驟 1: 比較 L=1 vs R=1
  ✓ 相等! 產生結果: (1, Alice, Taipei)
  發現 Left 還有另一個 1,也匹配: (1, Alice2, Taipei)
  推進 R 指針(Right 沒有更多 1 了)

步驟 2: L=1(第二個) vs R=2
  L < R,推進 L 指針

步驟 3: L=2 vs R=2
  ✓ 相等! 產生結果: (2, Bob, Taichung)
  Right 還有另一個 2: (2, Bob, Kaohsiung)
  推進 L 和 R 指針

步驟 4: L=3 vs R=4
  L < R,推進 L 指針

步驟 5: L=5 vs R=4
  L > R,推進 R 指針

步驟 6: R 已耗盡,結束

最終結果:
user_id | name   | city
--------+--------+-----------
1       | Alice  | Taipei
1       | Alice2 | Taipei
2       | Bob    | Taichung
2       | Bob    | Kaohsiung

Sort-Merge Join 的優勢

相比 Hash Join 的優勢:

  1. 利用已有排序: 如果輸入已排序,無需額外的預處理步驟
  2. 記憶體友善: 不需要構建龐大的 Hash Table
  3. 流式處理: 可以流式產生結果,無需等待 Build Phase 完成
  4. 保持排序: 輸出自然保持排序,對後續 ORDER BY 有利
  5. 適合大型數據: 可以處理無法放入記憶體的大表

時間複雜度分析:

假設輸入已排序:
  - 掃描左表: O(N)
  - 掃描右表: O(M)
  - 總時間: O(N + M)  ← 與 Hash Join 相同!

假設輸入未排序:
  - 排序左表: O(N log N)
  - 排序右表: O(M log M)
  - 歸併: O(N + M)
  - 總時間: O(N log N + M log M)  ← 排序開銷較大

關鍵洞察:

  • 如果數據已經排序,Sort-Merge Join 可能比 Hash Join 更快
  • 如果數據未排序,排序的開銷通常使 Hash Join 成為更好的選擇

DataFusion 中的 Sort-Merge Join 實現

SortMergeJoinExec 的核心結構

// datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
pub struct SortMergeJoinExec {
    /// 左側輸入(已排序)
    pub left: Arc<dyn ExecutionPlan>,
    
    /// 右側輸入(已排序)
    pub right: Arc<dyn ExecutionPlan>,
    
    /// 連接條件
    pub on: JoinOn,
    
    /// 額外的過濾條件
    pub filter: Option<JoinFilter>,
    
    /// Join 類型
    pub join_type: JoinType,
    
    /// 左側的排序表達式
    left_sort_exprs: LexOrdering,
    
    /// 右側的排序表達式
    right_sort_exprs: LexOrdering,
    
    /// 排序選項(升序/降序、NULL 處理等)
    pub sort_options: Vec<SortOptions>,
    
    // ... 其他欄位
}

關鍵設計決策:

  1. 前提假設: 兩個輸入必須已按連接鍵排序
  2. 不負責排序: 如果輸入未排序,應該在規劃階段插入 SortExec
  3. 排序表達式: left_sort_exprsright_sort_exprs 記錄了預期的排序方式

Streamed 側與 Buffered 側

Sort-Merge Join 使用一個 Streamed 側一個 Buffered 側:

Streamed 側:
  - 逐行順序掃描
  - 當前只保留少量行在記憶體中
  - 不支援 spilling(因為數據量小)

Buffered 側:
  - 緩衝所有具有相同連接鍵值的行
  - 可能需要同時保留多個 batch
  - 支援 spilling 到磁碟(如果記憶體不足)

為何需要 Buffering?

考慮有重複連接鍵的情況:

Streamed 側:         Buffered 側:
user_id | name       user_id | order_id
--------+-------     --------+----------
1       | Alice      1       | 101
2       | Bob        1       | 102
                     1       | 103  ← 多個 user_id=1 的訂單
                     2       | 201
                     2       | 202

處理 Streamed user_id=1 (Alice) 時:
  需要與 Buffered 側的所有 user_id=1 的行進行匹配
  → 必須緩衝 101, 102, 103 三行
  → 產生 3 個結果行

處理完 user_id=1 後:
  可以釋放緩衝的 101, 102, 103
  → 記憶體被回收

哪一側是 Streamed/Buffered?

根據 Join 類型選擇:

// datafusion/physical-plan/src/joins/sort_merge_join/exec.rs

/// 決定哪一側作為 probe side (streamed)
fn probe_side(join_type: &JoinType) -> JoinSide {
    match join_type {
        JoinType::Inner | JoinType::Right | JoinType::RightSemi 
        | JoinType::RightAnti => {
            JoinSide::Left  // 左側作為 streamed
        }
        JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
            JoinSide::Right  // 右側作為 streamed
        }
        JoinType::Full => {
            JoinSide::Left  // 默認左側作為 streamed
        }
    }
}

選擇原則: 對於 LEFT OUTER JOIN,左側必須完整輸出,所以選擇左側作為 streamed 側(逐行處理,確保每行都被處理)。

歸併邏輯的詳細流程

讓我們深入理解歸併的狀態機:

// 簡化的 Sort-Merge Join 執行邏輯
async fn sort_merge_join(
    mut streamed: SendableRecordBatchStream,
    mut buffered: SendableRecordBatchStream,
    join_type: JoinType,
) -> Result<Vec<RecordBatch>> {
    let mut output = Vec::new();
    let mut buffered_data = BufferedData::new();
    
    // 讀取第一個 streamed batch
    let mut streamed_batch = streamed.next().await?;
    let mut streamed_idx = 0;
    
    // 讀取第一個 buffered batch
    let mut buffered_batch = buffered.next().await?;
    let mut buffered_idx = 0;
    
    loop {
        // 如果任一側耗盡,根據 join 類型決定是否繼續
        if streamed_batch.is_none() || buffered_batch.is_none() {
            break;
        }
        
        let streamed_key = get_join_key(&streamed_batch, streamed_idx);
        let buffered_key = get_join_key(&buffered_batch, buffered_idx);
        
        match streamed_key.cmp(&buffered_key) {
            Ordering::Equal => {
                // 找到匹配!
                // 緩衝所有 buffered 側具有相同鍵值的行
                buffered_data.clear();
                buffered_data.push(buffered_batch, buffered_idx);
                
                // 繼續掃描 buffered 側,收集所有相同鍵值的行
                loop {
                    buffered_idx += 1;
                    if buffered_idx >= buffered_batch.num_rows() {
                        // 當前 batch 耗盡,讀取下一個
                        buffered_batch = buffered.next().await?;
                        if buffered_batch.is_none() {
                            break;
                        }
                        buffered_idx = 0;
                    }
                    
                    let next_key = get_join_key(&buffered_batch, buffered_idx);
                    if next_key != streamed_key {
                        break;  // 不再相等,停止緩衝
                    }
                    buffered_data.push(buffered_batch, buffered_idx);
                }
                
                // 產生輸出: streamed 行 × buffered 中所有匹配的行
                for buffered_row in buffered_data.rows() {
                    let output_row = join_rows(
                        &streamed_batch, 
                        streamed_idx, 
                        buffered_row
                    );
                    output.push(output_row);
                }
                
                // 推進 streamed 指針
                streamed_idx += 1;
                if streamed_idx >= streamed_batch.num_rows() {
                    streamed_batch = streamed.next().await?;
                    streamed_idx = 0;
                }
            }
            Ordering::Less => {
                // streamed_key < buffered_key
                // Streamed 側的當前值沒有匹配,推進 streamed
                if join_type == JoinType::Left {
                    // LEFT JOIN: 需要輸出未匹配的 streamed 行
                    output.push(join_with_nulls(&streamed_batch, streamed_idx));
                }
                
                streamed_idx += 1;
                if streamed_idx >= streamed_batch.num_rows() {
                    streamed_batch = streamed.next().await?;
                    streamed_idx = 0;
                }
            }
            Ordering::Greater => {
                // streamed_key > buffered_key
                // Buffered 側的當前值沒有匹配,推進 buffered
                if join_type == JoinType::Right || join_type == JoinType::Full {
                    // RIGHT/FULL JOIN: 需要輸出未匹配的 buffered 行
                    output.push(join_with_nulls_buffered(&buffered_batch, buffered_idx));
                }
                
                buffered_idx += 1;
                if buffered_idx >= buffered_batch.num_rows() {
                    buffered_batch = buffered.next().await?;
                    buffered_idx = 0;
                }
            }
        }
    }
    
    Ok(output)
}

處理重複鍵值的案例

讓我們通過一個詳細例子理解緩衝機制:

Streamed 側:              Buffered 側:
user_id | name           user_id | product | amount
--------+-------         --------+---------+--------
1       | Alice          1       | Book    | 100
1       | Alice2         1       | Pen     | 50
2       | Bob            1       | Paper   | 30
3       | Carol          2       | Book    | 200
                         2       | Pencil  | 80
                         4       | Eraser  | 20

執行過程:

【步驟 1】
  Streamed: user_id=1 (Alice)
  Buffered: user_id=1 (Book)
  → 相等!
  
  開始緩衝 Buffered 側所有 user_id=1 的行:
    buffered_data = [
      (1, Book, 100),
      (1, Pen, 50),
      (1, Paper, 30)
    ]
  
  產生輸出:
    (1, Alice, Book, 100)
    (1, Alice, Pen, 50)
    (1, Alice, Paper, 30)
  
  Buffered 指針現在指向 (2, Book, 200)
  推進 Streamed 指針

【步驟 2】
  Streamed: user_id=1 (Alice2)
  Buffered: user_id=2 (Book)
  → Streamed < Buffered
  
  但等等! buffered_data 中還有 user_id=1 的緩衝數據
  → 重用緩衝數據(無需重新掃描)
  
  產生輸出:
    (1, Alice2, Book, 100)
    (1, Alice2, Pen, 50)
    (1, Alice2, Paper, 30)
  
  推進 Streamed 指針,釋放 buffered_data

【步驟 3】
  Streamed: user_id=2 (Bob)
  Buffered: user_id=2 (Book)
  → 相等!
  
  緩衝 user_id=2 的行:
    buffered_data = [
      (2, Book, 200),
      (2, Pencil, 80)
    ]
  
  產生輸出:
    (2, Bob, Book, 200)
    (2, Bob, Pencil, 80)
  
  推進兩側指針

【步驟 4】
  Streamed: user_id=3 (Carol)
  Buffered: user_id=4 (Eraser)
  → Streamed < Buffered
  
  對於 INNER JOIN,不輸出
  對於 LEFT JOIN,輸出 (3, Carol, NULL, NULL)
  
  推進 Streamed 指針

【步驟 5】
  Streamed 已耗盡,結束

關鍵洞察:

  • 緩衝數據可以被重用,如果 Streamed 側有多個相同鍵值的行
  • 緩衝的粒度是連接鍵值,而不是整個表
  • 當連接鍵值改變時,緩衝的數據被釋放,記憶體得以回收

Nested Loop Join - 最基礎的 Join 算法

在討論 Join 策略選擇之前,我們還需要了解第三種 Join 算法:Nested Loop Join(嵌套循環連接)。

什麼時候需要 Nested Loop Join?

Nested Loop Join 用於沒有等值連接條件的情況:

-- 場景 1: 非等值連接
SELECT * 
FROM products p
JOIN orders o ON o.price > p.cost * 1.2;

-- 場景 2: 複雜的連接條件
SELECT * 
FROM employees e1
JOIN employees e2 ON e2.salary BETWEEN e1.salary * 0.8 AND e1.salary * 1.2;

-- 場景 3: CROSS JOIN + Filter
SELECT * 
FROM table1 t1, table2 t2
WHERE t1.col1 + t2.col2 > 100;

這些查詢無法使用 Hash Join 或 Sort-Merge Join,因為:

  • Hash Join 需要等值條件(=)來構建 Hash Table
  • Sort-Merge Join 也需要等值條件來進行歸併

Nested Loop Join 的實現

DataFusion 的 NestedLoopJoinExec 實現:

// datafusion/physical-plan/src/joins/nested_loop_join.rs

/// Nested Loop Join: 適用於沒有等值連接條件的場景
/// 
/// 執行流程:
/// 1. 緩衝整個左側(Build 側)到記憶體
/// 2. 對右側(Probe 側)的每一行:
///    - 與左側的所有行進行笛卡爾積
///    - 評估 join filter
///    - 產生匹配的結果
pub struct NestedLoopJoinExec {
    /// 左側(build 側)
    pub(crate) left: Arc<dyn ExecutionPlan>,
    
    /// 右側(probe 側)
    pub(crate) right: Arc<dyn ExecutionPlan>,
    
    /// 連接過濾條件(必須有,否則是純 CROSS JOIN)
    pub(crate) filter: Option<JoinFilter>,
    
    /// Join 類型
    pub(crate) join_type: JoinType,
    
    /// Future 用於非同步載入左側數據
    build_side_data: OnceAsync<JoinLeftData>,
    
    // ... 其他欄位
}

執行邏輯:

// 簡化的 Nested Loop Join 實現
async fn nested_loop_join(
    build_input: SendableRecordBatchStream,
    probe_input: SendableRecordBatchStream,
    filter: JoinFilter,
) -> Result<Vec<RecordBatch>> {
    // 階段 1: 緩衝整個 Build 側
    let mut build_batches = Vec::new();
    while let Some(batch) = build_input.next().await {
        build_batches.push(batch?);
    }
    let build_data = concat_batches(&schema, &build_batches)?;
    
    let mut output = Vec::new();
    
    // 階段 2: 對每個 Probe batch
    while let Some(probe_batch) = probe_input.next().await {
        let probe_batch = probe_batch?;
        
        // 對 Probe batch 的每一行
        for probe_idx in 0..probe_batch.num_rows() {
            // 與 Build 側的每一行進行組合
            for build_idx in 0..build_data.num_rows() {
                // 創建組合行
                let combined = combine_rows(
                    &build_data, build_idx,
                    &probe_batch, probe_idx
                );
                
                // 評估 filter
                if filter.evaluate(&combined)? {
                    output.push(combined);
                }
            }
        }
    }
    
    Ok(output)
}

時間複雜度: O(N × M) - 每個 left 行都要與每個 right 行比較

適用場景:

  • 沒有等值連接條件
  • Build 側非常小(幾千行以內)
  • 無其他選擇

Join 策略選擇 - DataFusion 的決策邏輯

現在我們了解了三種 Join 算法:

  • Hash Join: 適合大多數等值連接
  • Sort-Merge Join: 適合已排序的輸入或記憶體受限場景
  • Nested Loop Join: 適合非等值連接或極小的表

那麼,DataFusion 如何在它們之間做出選擇?

物理規劃器的選擇邏輯

讓我們詳細分析 physical_planner.rs 中的決策流程:

// datafusion/core/src/physical_planner.rs

// 從 LogicalPlan::Join 生成物理計劃
LogicalPlan::Join(Join { 
    left, right, on, filter, join_type, .. 
}) => {
    let join_on = /* 提取等值連接鍵 */;
    let join_filter = /* 提取非等值過濾條件 */;
    
    // 決策樹開始
    
    // 【決策 1】是否有等值連接鍵?
    if join_on.is_empty() {
        // 沒有等值連接鍵
        if join_filter.is_none() && join_type == JoinType::Inner {
            // 純 CROSS JOIN,使用專用的 CrossJoinExec
            return CrossJoinExec::new(left, right);
        } else {
            // 有非等值條件,使用 NestedLoopJoin
            return NestedLoopJoinExec::try_new(
                left, right, join_filter, join_type
            );
        }
    }
    
    // 有等值連接鍵,繼續決策
    
    // 讀取配置
    let prefer_hash_join = config.prefer_hash_join;  // 默認 true
    let repartition_joins = config.repartition_joins;
    let target_partitions = config.target_partitions;
    
    // 【決策 2】是否啟用並行?
    let can_parallel = target_partitions > 1 && repartition_joins;
    
    if can_parallel && !prefer_hash_join {
        // 【策略 A】啟用並行 + 偏好 SortMergeJoin
        return SortMergeJoinExec::try_new(
            left, right, join_on, join_filter, join_type
        );
    } else if can_parallel && prefer_hash_join {
        // 【策略 B】啟用並行 + 偏好 HashJoin(最常見)
        return HashJoinExec::try_new(
            left, right, join_on, join_filter, join_type,
            PartitionMode::Auto  // 自動選擇分區策略
        );
    } else {
        // 【策略 C】不並行,使用單分區 HashJoin
        return HashJoinExec::try_new(
            left, right, join_on, join_filter, join_type,
            PartitionMode::CollectLeft  // 收集 left 到單一分區
        );
    }
}

配置參數的影響

DataFusion 提供了幾個關鍵配置來控制 Join 策略:

// datafusion/common/src/config.rs

// 1. prefer_hash_join (默認: true)
/// 當設為 true 時,物理規劃器偏好 HashJoin 而非 SortMergeJoin
/// HashJoin 通常更高效,但消耗更多記憶體
pub prefer_hash_join: bool, default = true

// 2. repartition_joins (默認: true)
/// 是否允許為 Join 重新分區數據
/// 如果 false,則使用單分區模式
pub repartition_joins: bool, default = true

// 3. target_partitions (默認: CPU 核心數)
/// 目標並行度
pub target_partitions: usize, default = num_cpus

// 4. hash_join_single_partition_threshold (默認: 1MB)
/// HashJoin 使用單分區模式的閾值
/// 如果 Build 側小於此值,使用 CollectLeft 模式
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024

物理優化器的進一步調整

物理規劃器生成初始計劃後,物理優化器會進一步優化:

優化 1: EnforceDistribution 規則

確保 Join 兩側滿足分區要求:

// 偽代碼
if join.mode == PartitionMode::Partitioned {
    // SortMergeJoin 或 Partitioned HashJoin
    // 需要兩側按相同的 hash 值分區
    
    if !left.satisfies_hash_partitioning(join_keys) {
        // 插入 RepartitionExec
        left = RepartitionExec::try_new(
            left, 
            Partitioning::Hash(join_keys, target_partitions)
        );
    }
    
    if !right.satisfies_hash_partitioning(join_keys) {
        right = RepartitionExec::try_new(
            right,
            Partitioning::Hash(join_keys, target_partitions)
        );
    }
}

優化 2: EnforceSorting 規則

如果選擇了 SortMergeJoin,確保輸入已排序:

// pseudocode
if let SortMergeJoinExec { left, right, on, .. } = join {
    if !left.output_ordering().satisfies(on) {
        // 插入 SortExec
        left = SortExec::new(left, on);
    }
    
    if !right.output_ordering().satisfies(on) {
        right = SortExec::new(right, on);
    }
}

實際案例分析

讓我們通過幾個實際場景來理解策略選擇:

案例 1: 典型的 OLAP 查詢

SELECT 
    p.product_name,
    SUM(o.amount) as total_sales
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
GROUP BY p.product_name;

分析:

  • orders 表: 1000 萬行,2 GB
  • products 表: 10 萬行,20 MB
  • 有等值連接條件: o.product_id = p.product_id
  • 數據未排序

DataFusion 的選擇:

HashJoinExec (PartitionMode::CollectLeft)
├─ products (Build 側,收集到單分區)
│  └─ TableScan
└─ orders (Probe 側,保持分區)
   └─ FilterExec (order_date >= '2024-01-01')
      └─ TableScan

原因:

  1. products 很小(20 MB < 1MB 閾值?實際上大於,但相對 orders 很小)
  2. 有等值連接,prefer_hash_join=true
  3. products 作為 Build 側,構建 Hash Table 快速且記憶體開銷小

案例 2: 兩個大表 Join

SELECT *
FROM huge_fact_table1 f1
JOIN huge_fact_table2 f2 ON f1.key = f2.key;

分析:

  • fact_table1: 10 億行,50 GB
  • fact_table2: 8 億行,40 GB
  • 兩個表都很大
  • target_partitions = 16

DataFusion 的選擇:

HashJoinExec (PartitionMode::Partitioned)
├─ RepartitionExec (Hash(key), 16 partitions)
│  └─ fact_table1
└─ RepartitionExec (Hash(key), 16 partitions)
   └─ fact_table2

原因:

  1. 兩個表都很大,無法使用 CollectLeft
  2. 使用 Partitioned 模式,每個分區獨立處理
  3. 假設每個分區約 3 GB (50GB / 16),可以放入記憶體

案例 3: 已排序的時序數據

-- 兩個表都已按 timestamp 排序
SELECT *
FROM sensor_data_2023 s1
JOIN sensor_data_2024 s2 
  ON s1.sensor_id = s2.sensor_id
WHERE s1.timestamp BETWEEN '2023-12-01' AND '2023-12-31'
  AND s2.timestamp BETWEEN '2024-01-01' AND '2024-01-31'
ORDER BY s1.sensor_id, s1.timestamp;

分析:

  • 兩個表都已按 sensor_id 排序
  • 查詢結果需要排序輸出
  • 如果使用 Hash Join,需要額外的排序步驟

理想的計劃(如果設置 prefer_hash_join=false):

SortMergeJoinExec
├─ FilterExec (timestamp BETWEEN ...)
│  └─ TableScan (sensor_data_2023)
└─ FilterExec (timestamp BETWEEN ...)
   └─ TableScan (sensor_data_2024)

優勢:

  1. 利用已有排序,無需 SortExec
  2. 輸出自然保持排序,滿足 ORDER BY
  3. 流式處理,記憶體友善

但實際上,由於 prefer_hash_join=true,DataFusion 可能仍選擇 HashJoin,除非用戶手動配置。

案例 4: 非等值連接

SELECT *
FROM employees e1
JOIN employees e2 
  ON e2.salary BETWEEN e1.min_salary AND e1.max_salary;

DataFusion 的選擇:

NestedLoopJoinExec
├─ employees (左側,較小)
└─ employees (右側)

原因: 沒有等值連接條件,只能使用 Nested Loop Join

小結

今天我們完成了 Join 算子的完整探討,學習了三種核心 Join 算法及其選擇策略。

Sort-Merge Join 的關鍵特點

  1. 前提要求: 兩個輸入必須已按連接鍵排序
  2. 執行模型:
    • Streamed 側逐行掃描
    • Buffered 側緩衝相同鍵值的所有行
    • 歸併過程類似 Merge Sort
  3. 記憶體友善: 只緩衝當前鍵值的行,而非整個表
  4. 流式處理: 可以逐步產生結果,無需等待完整 Build Phase
  5. 保持排序: 輸出自然保持排序,對後續操作有利

三種 Join 算法的定位

算法 核心優勢 主要限制 最佳場景
Hash Join 快速查找 O(1)高並行度 需要構建 Hash Table記憶體消耗較大 大多數等值連接記憶體充足數據未排序
Sort-Merge Join 利用已有排序記憶體友善保持排序 需要預排序未排序時開銷大 數據已排序記憶體受限需要排序輸出
Nested Loop Join 支援任意條件實現簡單 O(N×M) 複雜度性能較差 非等值連接Build 側極小無其他選擇

DataFusion 的選擇策略

  1. 默認偏好 Hash Join: prefer_hash_join=true,適合大多數 OLAP 場景
  2. 智能分區模式:
    • CollectLeft: Build 側較小,收集到單分區
    • Partitioned: 兩側都大,按 hash 分區並行處理
  3. 自動插入輔助算子:
    • RepartitionExec: 確保分區對齊
    • SortExec: 為 Sort-Merge Join 排序輸入
  4. 基於統計的優化: 選擇較小的表作為 Build 側

至此,Join 算子的介紹算是告一個段落了。明天,我們會轉向另一個主題:數據源整合 Part 1 - TableProvider 機制,探討 DataFusion 如何抽象和統一各種數據源的訪問。

參考資料

  1. DataFusion SortMergeJoinExec 原始碼
  2. SortMergeJoinStream 實現
  3. NestedLoopJoinExec 原始碼
  4. Physical Planner Join 選擇邏輯
  5. 配置選項
  6. Sort-Merge Join 演算法(維基百科)
  7. Sort-Merge Join 視覺化演示
  8. Join Algorithms in Database Systems
  9. PostgreSQL Join Methods
  10. Volcano Query Evaluation Model

上一篇
Day 21: Join 算子 Part 1 - Hash Join 原理
下一篇
Day 23: 數據源整合 Part 1 - TableProvider 機制
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言